Vamos a crear un punto de entrada al API de dataframes y dataset.
In [2]:
from pyspark.sql import SparkSession
In [2]:
spark= SparkSession.builder.appName("Trabajando con Spark SQL").getOrCreate()
Lo primero que vamos a leer va a ser un fichero json que representa la tabla periodica y lo vamos a almacenar en un dataframe sobre el que vamos a ir realizando diferentes acciones como si se tratara de un RDD.
Nota: el formato json en spark SQL es un formato por línea , como si fuera un CSV, por lo tanto, hay que transformar el listado de objetos en un una fila por cada objeto.
In [3]:
import json
with open('sql/PeriodicTableJSON.json') as data_file:
data = json.load(data_file)
with open('sql/PeriodicTableJSON.jsonl', 'w') as outfile:
for entry in data:
json.dump(entry, outfile)
outfile.write('\n')
In [4]:
df = spark.read.json("sql/PeriodicTableJSON.jsonl")
In [5]:
df.show()
In [6]:
df.printSchema()
In [7]:
df.select("name").show()
Seleccionamos los elementos químicos que tengan la masa atómica menor que 200 y mostramos los 10 primeros.
In [8]:
df.select(df['name'],df['atomic_mass']).filter(df['atomic_mass']<200).show(10)
In [9]:
df.groupBy('phase').count().show()
Ahora vamos a ver como a partir de un dataframe podemos generar una tabla temporal sobre la que ejecutaremos sentencias en SQL.
In [10]:
df.createGlobalTempView("chemistryTable")
In [11]:
spark.sql("select name from global_temp.chemistryTable").show(5)
Como ya hemos comentado en el post, python no permite construir estructuras de dataset. Para que te hagas una idea si vienes del mundo Java o Scala. La creación de dataset se basa en la definición de una clase y permite añadir objetos de esa clase. El resultado es una estructura en formato de tabla como el dataframe mostrado en nuestro caso.
En Spark SQL, existen dos formas de inferir el esquema un dataframe. Una es mediante reflexión y la otra es explicitamente con programación. A continuación vamos a ver ambos casos sobre un documento txt que contiene el elemento químico y su masa atómica.
In [12]:
from pyspark.sql import Row
sc = spark.sparkContext
lines=sc.textFile("sql/Periodictable.txt")
parts= lines.map(lambda p: p.split(","))
elements= parts.map(lambda e: Row(name=e[0],atomic_mass=float(e[1])))
In [13]:
schemeElements=spark.createDataFrame(elements)
schemeElements.createOrReplaceTempView("elements")
In [14]:
lightElements=spark.sql("select name from elements where atomic_mass>0 and atomic_mass<21")
In [15]:
lightElemName=lightElements.rdd.map(lambda elem: "Name: "+elem.name).collect()
for name in lightElemName:
print(name)
Ahora vamos a ver como se haría programáticamente.
In [16]:
from pyspark.sql.types import *
sc=spark.sparkContext
lines=sc.textFile("sql/Periodictable.txt")
parts=lines.map(lambda line: line.split(","))
elements= parts.map(lambda p: (p[0],p[1]))
schemeString="name atomicMass"
fields= [StructField(field_name,StringType(),True) for field_name in schemeString.split()]
scheme =StructType(fields)
schemeElements= spark.createDataFrame(elements,scheme)
schemeElements.createOrReplaceTempView("elements")
spark.sql("select name,atomicMass from elements").show()
schemeElements.printSchema()
Existen multitud de formatos disponible en Spark SQL (json,parquet,jdbc,orc,libsvm,csv,text,...) aunque el formato por defecto es parquet. En este apartado vamos a ver el manejo de diferentes formatos de datos y la comunicación con Hive, Parquet y JDBC para guardar/recuperar información.
Lo primero que vamos hacer es escribir la query anterior a un fichero parquet para despues hacer queries directamente sobre el fichero.
In [17]:
schemeElements.select("name","atomicMass").write.save("sql/namesAndAtomicMass.parquet")
Y ahra podemos hacer una query directamente desde el fichero.
In [8]:
newDf= spark.sql("select atomicMass from parquet.`sql/namesAndAtomicMass.parquet`")
In [9]:
newDf.show()
Cuando has arrancado el cluster de docker, en la base de datos hemos añadido una setencia books.sql alojada en notebook/initdb que carga dentro del esquema postgres la tabla libros con title author y año
In [ ]:
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://db:5432/postgres") \
.option("dbtable", "books") \
.option("user", "postgres") \
.option("driver", "org.postgresql.Driver") \
.option("password", "root") \
.load()
In [3]:
jdbcDF.show();
In [4]:
jdbcDF.printSchema();